package com.mimo.comet.user.service.impl;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import com.mimo.comet.config.LocalCometServerConfig;
import com.mimo.comet.config.LogicMsgPoolFactory;
import com.mimo.comet.config.LogicUserPoolFactory;
import com.mimo.comet.constants.UserKeys;
import com.mimo.comet.dao.ISessionDao;
import com.mimo.comet.provider.ISession;
import com.mimo.comet.provider.command.AckCommand;
import com.mimo.comet.provider.command.echo.ErrorEchoCommand;
import com.mimo.comet.user.service.IUserService;
import com.mimo.common.comet.dto.user.UserLoginReq;
import com.mimo.common.configuration.mrlock.annotation.DistributedRedisLock;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.dto.msg.BaseDispatchMessage;
import com.mimo.common.rpc.proto.BaseResponseProto.BaseResponse;
import com.mimo.common.rpc.proto.DispatchMessageProto;
import com.mimo.common.utils.JsonUtils;

@Service
public class UserServiceImpl implements IUserService {

  private static final Logger log = LoggerFactory.getLogger(UserServiceImpl.class);

  @Autowired
  private LogicUserPoolFactory logicUserPoolFactory;

  @Autowired
  private LogicMsgPoolFactory logicMsgPoolFactory;

  @Autowired
  private LocalCometServerConfig localCometServerConfig;

  @Autowired
  private ISessionDao sessionDao;

  @Resource
  private IUserService userSelfService;

  @Override
  public List<BaseDispatchMessage> getPendingMessage(String userId) {

    List<BaseDispatchMessage> msgs = Collections.emptyList();
    BaseResponse resp = logicMsgPoolFactory.getLogicMessageStub().getPendingMessage(StringValue.of(userId));
    if (resp.hasPluralityValue()) {
      msgs = resp.getPluralityValue().getItemsList().stream().map(any -> {
        try {
          return any.unpack(DispatchMessageProto.DispatchMessage.class);
        } catch (InvalidProtocolBufferException e) {
          throw new IllegalArgumentException(e);
        }
      }).map(BaseDispatchMessage::convert).collect(Collectors.toList());
    }
    return msgs;
  }

  @Override
  public void logout(String userId, String msgId) {
    // 模块通知退出
    logicUserPoolFactory.getLogicUserStub().logout(StringValue.of(userId));

    // 本地进行解绑
    userSelfService.unbinding(userId, msgId);
  }

  @Override
  @DistributedRedisLock(key = UserKeys.COMET_USER_LOCKER_PREFIX + "#{#req.uid}", expired = 60, waited = 60)
  public StatusCode login(UserLoginReq req) {
    Objects.requireNonNull(req);
    return StatusCode.from(logicUserPoolFactory.getLogicUserStub().login(UserLoginReq.convert(req)));
  }

  @Override
  @DistributedRedisLock(key = UserKeys.COMET_USER_LOCKER_PREFIX + "#{#userId}", expired = 60, waited = 60)
  public boolean binding(String userId, ISession session) {
    Objects.requireNonNull(userId);
    Objects.requireNonNull(session);
    Assert.isTrue(Objects.equals(userId, session.getSessionId()), "用户信息不一致");

    boolean successful = false;

    if (!sessionDao.findByUid(userId).isPresent()) {
      sessionDao.create(session);
      successful = true;
    }
    return successful;
  }

  @DistributedRedisLock(key = UserKeys.COMET_USER_LOCKER_PREFIX + "#{#userId}", expired = 60, waited = 60)
  @Override
  public void unbinding(String userId, @Nullable String msgId) {
    Objects.requireNonNull(userId);
    Optional<ISession> opt = sessionDao.remove(userId);
    if (opt.isPresent()) {
      if (StringUtils.hasText(msgId)) {
        opt.get().write(JsonUtils.toJsonString(new AckCommand(msgId)));
      }
      opt.get().close();
      log.info("ApplicationId:{}->成功解绑用户[{}]本地会话", localCometServerConfig.getApplicationId(), userId);
    }
  }

  @DistributedRedisLock(key = UserKeys.COMET_USER_LOCKER_PREFIX + "#{#userId}", expired = 60, waited = 60)
  @Override
  public void unbinding(String userId, @Nullable String msgId, String deviceId) {
    Objects.requireNonNull(userId);
    Optional<ISession> opt = sessionDao.remove(userId);
    if (opt.isPresent()) {
      if (StringUtils.hasText(msgId)) {
        opt.get().write(JsonUtils.toJsonString(new AckCommand(msgId)));
      }

      // 如果解绑时
      if (!Objects.equals(deviceId, opt.get().getAttribute("deviceId"))) {
        ErrorEchoCommand errCmd = new ErrorEchoCommand();
        errCmd.setCode(StatusCode.ForceLogout.getCode());
        errCmd.setMsg(StatusCode.ForceLogout.getMsg());
        opt.get().write(JsonUtils.toJsonString(errCmd));
      }

      opt.get().close();
      log.info("ApplicationId:{},zone:{}->成功解绑用户[{}]本地会话", localCometServerConfig.getApplicationId(),
          localCometServerConfig.getZone(), userId);
    }
  }

}
